/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2021-2021. All rights reserved.
 */

package autodispose;

import autodispose.observers.AutoDisposingObserver;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.observers.DisposableCompletableObserver;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * AutoDisposingObserverImpl class.
 *
 * @param <T>
 */
final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDisposingObserver<T> {
    @SuppressWarnings("WeakerAccess") // Package private for synthetic accessor saving
    final AtomicReference<Disposable> mainDisposable = new AtomicReference<>();

    @SuppressWarnings("WeakerAccess") // Package private for synthetic accessor saving
    final AtomicReference<Disposable> scopeDisposable = new AtomicReference<>();

    private final AtomicThrowable error = new AtomicThrowable();
    private final CompletableSource scope;
    private final Observer<? super T> delegate;

    AutoDisposingObserverImpl(CompletableSource scope, Observer<? super T> delegate) {
        this.scope = scope;
        this.delegate = delegate;
    }

    @Override
    public Observer<? super T> delegateObserver() {
        return delegate;
    }

    @Override
    public void onSubscribe(final Disposable d) {
        DisposableCompletableObserver o =
                new DisposableCompletableObserver() {
                    @Override
                    public void onError(Throwable e) {
                        scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                        AutoDisposingObserverImpl.this.onError(e);
                    }

                    @Override
                    public void onComplete() {
                        scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                        AutoDisposableHelper.dispose(mainDisposable);
                    }
                };
        if (AutoDisposeEndConsumerHelper.setOnce(scopeDisposable, o, getClass())) {
            delegate.onSubscribe(this);
            scope.subscribe(o);
            AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
        }
    }

    @Override
    public boolean isDisposed() {
        return mainDisposable.get() == AutoDisposableHelper.DISPOSED;
    }

    @Override
    public void dispose() {
        AutoDisposableHelper.dispose(scopeDisposable);
        AutoDisposableHelper.dispose(mainDisposable);
    }

    @Override
    public void onNext(T value) {
        if (!isDisposed()) {
            if (HalfSerializer.onNext(delegate, value, this, error)) {
                // Terminal event occurred and was forwarded to the delegate, so clean up here
                mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                AutoDisposableHelper.dispose(scopeDisposable);
            }
        }
    }

    @Override
    public void onError(Throwable e) {
        if (!isDisposed()) {
            mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
            AutoDisposableHelper.dispose(scopeDisposable);
            HalfSerializer.onError(delegate, e, this, error);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
            AutoDisposableHelper.dispose(scopeDisposable);
            HalfSerializer.onComplete(delegate, this, error);
        }
    }
}
